package com.atguigu.app.dws;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.bean.UserLoginBean;
import com.atguigu.utils.DateFormatUtil;
import com.atguigu.utils.KafkaUtil;
import com.atguigu.utils.MyClickHouseUtil;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.time.Duration;

//数据流：web/app -> 日志服务器(log文件) -> flume -> Kafka(ODS) -> FlinkApp -> Kafka(DWD) -> FlinkApp -> ClickHouse(DWS)
//程  序：Mock -> 文件 -> f1.sh -> Kafka(ZK) -> BaseLogApp -> Kafka(ZK) -> Dws04UserUserLoginWindow -> ClickHouse(ZK)
public class Dws04UserUserLoginWindow {

    public static void main(String[] args) throws Exception {

        //TODO 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 需要从Checkpoint或者Savepoint启动程序
        //2.1 开启Checkpoint,每隔5秒钟做一次CK  ,并指定CK的一致性语义
        //env.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE);
        // 2.2 设置超时时间为 1 分钟
        //env.getCheckpointConfig().setCheckpointTimeout(60 * 1000L);
        // 2.3 设置两次重启的最小时间间隔
        //env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);
        // 2.5 指定从 CK 自动重启策略
        //env.setRestartStrategy(RestartStrategies.failureRateRestart(
        //        3, Time.days(1L), Time.minutes(1L)
        //));
        // 2.6 设置状态后端
        //env.setStateBackend(new EmbeddedRocksDBStateBackend(true) );
        //env.getCheckpointConfig().setCheckpointStorage(
        //      "hdfs://hadoop102:8020/flinkCDC"
        //);
        // 2.7 设置访问HDFS的用户名
        //System.setProperty("HADOOP_USER_NAME", "atguigu");

        //TODO 2.读取Kafka 页面日志主题数据创建流
        String topic = "dwd_traffic_page_log";
        String groupId = "user_login_220718";
        DataStreamSource<String> kafkaDS = env.addSource(KafkaUtil.getFlinkKafkaConsumer(topic, groupId));

        //TODO 3.将数据转换为JSON对象
        SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.map(JSON::parseObject);

        //TODO 4.提取时间戳生成WaterMark
        SingleOutputStreamOperator<JSONObject> jsonObjWithWMDS = jsonObjDS.assignTimestampsAndWatermarks(WatermarkStrategy.<JSONObject>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner(new SerializableTimestampAssigner<JSONObject>() {
            @Override
            public long extractTimestamp(JSONObject element, long recordTimestamp) {
                return element.getLong("ts");
            }
        }));

        //TODO 5.过滤数据
        SingleOutputStreamOperator<JSONObject> filterDS = jsonObjWithWMDS.filter(new FilterFunction<JSONObject>() {
            @Override
            public boolean filter(JSONObject value) throws Exception {

                String uid = value.getJSONObject("common").getString("uid");
                String lastPageId = value.getJSONObject("page").getString("last_page_id");

                return uid != null && (lastPageId == null || "login".equals(lastPageId));
            }
        });

        //TODO 6.按照Uid进行分组
        KeyedStream<JSONObject, String> keyedByUidStream = filterDS.keyBy(json -> json.getJSONObject("common").getString("uid"));

        //TODO 7.去重数据转换为JavaBean对象
        SingleOutputStreamOperator<UserLoginBean> userLoginBeanDS = keyedByUidStream.flatMap(new RichFlatMapFunction<JSONObject, UserLoginBean>() {

            private ValueState<String> lastVisitState;

            @Override
            public void open(Configuration parameters) throws Exception {
                lastVisitState = getRuntimeContext().getState(new ValueStateDescriptor<String>("last-state", String.class));
            }

            @Override
            public void flatMap(JSONObject value, Collector<UserLoginBean> out) throws Exception {

                //获取状态中的数据以及当前数据中的日期
                String lastDt = lastVisitState.value();
                String curDt = DateFormatUtil.toDate(value.getLong("ts"));

                //定义当天用户登录数以及回流用户数
                long uv = 0L;
                long backCt = 0L;

                if (lastDt == null) {
                    uv = 1L;
                    lastVisitState.update(curDt);
                } else if (lastDt.compareTo(curDt) < 0) {
                    uv = 1L;
                    lastVisitState.update(curDt);

                    //判断是否为回流用户
                    if ((DateFormatUtil.toTs(curDt) - DateFormatUtil.toTs(lastDt)) / (24 * 3600 * 1000L) > 7) {
                        backCt = 1L;
                    }
                }

                if (uv == 1L) {
                    out.collect(new UserLoginBean("", "", backCt, uv, 0L));
                }
            }
        });

        //TODO 8.开窗聚合
        SingleOutputStreamOperator<UserLoginBean> resultDS = userLoginBeanDS.windowAll(TumblingEventTimeWindows.of(Time.seconds(10)))
                .reduce(new ReduceFunction<UserLoginBean>() {
                    @Override
                    public UserLoginBean reduce(UserLoginBean value1, UserLoginBean value2) throws Exception {
                        value1.setUuCt(value1.getUuCt() + value2.getUuCt());
                        value1.setBackCt(value1.getBackCt() + value2.getBackCt());
                        return value1;
                    }
                }, new AllWindowFunction<UserLoginBean, UserLoginBean, TimeWindow>() {
                    @Override
                    public void apply(TimeWindow window, Iterable<UserLoginBean> values, Collector<UserLoginBean> out) throws Exception {
                        //获取数据
                        UserLoginBean userLoginBean = values.iterator().next();

                        //补充信息
                        userLoginBean.setTs(System.currentTimeMillis());
                        userLoginBean.setEdt(DateFormatUtil.toYmdHms(window.getEnd()));
                        userLoginBean.setStt(DateFormatUtil.toYmdHms(window.getStart()));

                        //输出数据
                        out.collect(userLoginBean);
                    }
                });

        //TODO 9.将数据写出到ClickHouse
        resultDS.print(">>>>>>>");
        resultDS.addSink(MyClickHouseUtil.getSinkFunction("insert into dws_user_user_login_window values(?,?,?,?,?)"));

        //TODO 10.启动任务
        env.execute("Dws04UserUserLoginWindow");

    }

}
